// Copyright (C) 2013-2015 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

#ifndef DEMO_MAIN_LOOP_H_
#define DEMO_MAIN_LOOP_H_

#if !defined (COMMONAPI_INTERNAL_COMPILATION)
#define COMMONAPI_INTERNAL_COMPILATION
#endif

#include <CommonAPI/MainLoopContext.hpp>

#include <vector>
#include <set>
#include <map>
#ifdef _WIN32
#include <WinSock2.h>
#else
#include <poll.h>
#include <sys/eventfd.h>
#include <unistd.h>
#endif


#include <cassert>


namespace CommonAPI {

class MainLoop {
 public:
    MainLoop() = delete;
    MainLoop(const MainLoop&) = delete;
    MainLoop& operator=(const MainLoop&) = delete;
    MainLoop(MainLoop&&) = delete;
    MainLoop& operator=(MainLoop&&) = delete;

    explicit MainLoop(std::shared_ptr<MainLoopContext> context)
                        : context_(context),
                          currentMinimalTimeoutInterval_(TIMEOUT_INFINITE),
                          hasToStop_(false),
                          running_(false),
                          isBroken_(false) {

    #ifdef _WIN32
        WSADATA wsaData;
        int iResult;

        SOCKET ListenSocket = INVALID_SOCKET;

        struct addrinfo *result = NULL;
        struct addrinfo hints;

        // Initialize Winsock
        iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
        if (iResult != 0) {
            printf("WSAStartup failed with error: %d\n", iResult);
        }

        ZeroMemory(&hints, sizeof(hints));
        hints.ai_family = AF_INET;
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_protocol = IPPROTO_TCP;
        hints.ai_flags = AI_PASSIVE;

        // Resolve the server address and port
        iResult = getaddrinfo(NULL, "0", &hints, &result);
        if (iResult != 0) {
            printf("getaddrinfo failed with error: %d\n", iResult);
            WSACleanup();
        }

        // Create a SOCKET for connecting to server
        ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
        if (ListenSocket == INVALID_SOCKET) {
            printf("socket failed with error: %ld\n", WSAGetLastError());
            freeaddrinfo(result);
            WSACleanup();
        }

        // Setup the TCP listening socket
        iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
        if (iResult == SOCKET_ERROR) {
            printf("bind failed with error: %d\n", WSAGetLastError());
            freeaddrinfo(result);
            closesocket(ListenSocket);
            WSACleanup();
        }

        sockaddr* connected_addr = new sockaddr();
        USHORT port = 0;
        int namelength = sizeof(sockaddr);
        iResult = getsockname(ListenSocket, connected_addr, &namelength);
        if (iResult == SOCKET_ERROR) {
            printf("getsockname failed with error: %d\n", WSAGetLastError());
        } else if (connected_addr->sa_family == AF_INET) {
            port = ((struct sockaddr_in*)connected_addr)->sin_port;
        }
        delete connected_addr;

        freeaddrinfo(result);

        iResult = listen(ListenSocket, SOMAXCONN);
        if (iResult == SOCKET_ERROR) {
            printf("listen failed with error: %d\n", WSAGetLastError());
            closesocket(ListenSocket);
            WSACleanup();
        }

        wsaData;
        wakeFd_.fd = INVALID_SOCKET;
        struct addrinfo *ptr = NULL;

        // Initialize Winsock
        iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
        if (iResult != 0) {
            printf("WSAStartup failed with error: %d\n", iResult);
        }

        ZeroMemory(&hints, sizeof(hints));
        hints.ai_family = AF_UNSPEC;
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_protocol = IPPROTO_TCP;

        // Resolve the server address and port
        iResult = getaddrinfo("127.0.0.1", std::to_string(ntohs(port)).c_str(), &hints, &result);
        if (iResult != 0) {
            printf("getaddrinfo failed with error: %d\n", iResult);
            WSACleanup();
        }

        // Attempt to connect to an address until one succeeds
        for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {

            // Create a SOCKET for connecting to server
            wakeFd_.fd = socket(ptr->ai_family, ptr->ai_socktype,
                ptr->ai_protocol);
            if (wakeFd_.fd == INVALID_SOCKET) {
                printf("socket failed with error: %ld\n", WSAGetLastError());
                WSACleanup();
            }

            // Connect to server.
            iResult = connect(wakeFd_.fd, ptr->ai_addr, (int)ptr->ai_addrlen);
            if (iResult == SOCKET_ERROR) {
                printf("connect failed with error: %ld\n", WSAGetLastError());
                closesocket(wakeFd_.fd);
                wakeFd_.fd = INVALID_SOCKET;
                continue;
            }
            break;
        }

        freeaddrinfo(result);

        if (wakeFd_.fd == INVALID_SOCKET) {
            printf("Unable to connect to server!\n");
            WSACleanup();
        }

        // Accept a client socket
        sendFd_.fd = accept(ListenSocket, NULL, NULL);
        if (sendFd_.fd == INVALID_SOCKET) {
            printf("accept failed with error: %d\n", WSAGetLastError());
            closesocket(ListenSocket);
            WSACleanup();
        }

        wakeFd_.events = POLLIN;
        registerFileDescriptor(wakeFd_);
    #else
        wakeFd_.fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
        wakeFd_.events = POLLIN;
        registerFileDescriptor(wakeFd_);
    #endif

        dispatchSourceListenerSubscription_ = context_->subscribeForDispatchSources(
                std::bind(&MainLoop::registerDispatchSource, this,
                        std::placeholders::_1, std::placeholders::_2),
                std::bind(&MainLoop::unregisterDispatchSource,
                        this, std::placeholders::_1));
        watchListenerSubscription_ = context_->subscribeForWatches(
                std::bind(&MainLoop::registerWatch, this,
                        std::placeholders::_1, std::placeholders::_2),
                std::bind(&MainLoop::unregisterWatch, this,
                        std::placeholders::_1));
        timeoutSourceListenerSubscription_ = context_->subscribeForTimeouts(
                std::bind(&MainLoop::registerTimeout, this,
                        std::placeholders::_1, std::placeholders::_2),
                std::bind(&MainLoop::unregisterTimeout, this,
                        std::placeholders::_1));
        wakeupListenerSubscription_ = context_->subscribeForWakeupEvents(
                std::bind(&MainLoop::wakeup, this));
    }

    ~MainLoop() {
        unregisterFileDescriptor (wakeFd_);

        context_->unsubscribeForDispatchSources(
                dispatchSourceListenerSubscription_);
        context_->unsubscribeForWatches(watchListenerSubscription_);
        context_->unsubscribeForTimeouts(timeoutSourceListenerSubscription_);
        context_->unsubscribeForWakeupEvents(wakeupListenerSubscription_);

    #ifdef _WIN32
        // shutdown the connection since no more data will be sent
        int iResult = shutdown(wakeFd_.fd, SD_SEND);
        if (iResult == SOCKET_ERROR) {
            printf("shutdown failed with error: %d\n", WSAGetLastError());
            closesocket(wakeFd_.fd);
            WSACleanup();
        }

        // cleanup
        closesocket(wakeFd_.fd);
        WSACleanup();
    #else
        close(wakeFd_.fd);
    #endif

        cleanup();
    }

    /**
     * \brief Runs the mainloop indefinitely until stop() is called.
     *
     * Runs the mainloop indefinitely until stop() is called. The given timeout (milliseconds)
     * will be overridden if a timeout-event is present that defines an earlier ready time.
     */
    void run(const int64_t& timeoutInterval = TIMEOUT_INFINITE) {
        running_ = true;
         hasToStop_ = false;

         while(!hasToStop_) {
             doSingleIteration(timeoutInterval);
         }
         running_ = false;
    }

    void stop() {
        hasToStop_ = true;
        wakeup();
    }

    /**
     * \brief Executes a single cycle of the mainloop.
     *
     * Subsequently calls prepare(), poll(), check() and, if necessary, dispatch().
     * The given timeout (milliseconds) represents the maximum time
     * this iteration will remain in the poll state. All other steps
     * are handled in a non-blocking way. Note however that a source
     * might claim to have infinite amounts of data to dispatch.
     * This demo-implementation of a Mainloop will dispatch a source
     * until it no longer claims to have data to dispatch.
     * Dispatch will not be called if no sources, watches and timeouts
     * claim to be ready during the check()-phase.
     *
     * @param timeout The maximum poll-timeout for this iteration.
     */
    void doSingleIteration(const int64_t& timeout = TIMEOUT_INFINITE) {
        {
            std::lock_guard<std::recursive_mutex> itsLock(dispatchSourcesMutex_);
            for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
                dispatchSourceIterator != registeredDispatchSources_.end();
                dispatchSourceIterator++) {

                if ((dispatchSourceIterator->second)->deleteObject_) {
                    if (!(dispatchSourceIterator->second)->isExecuted_) {
                        bool contained = false;
                        for (std::set<std::pair<DispatchPriority, DispatchSourceToDispatchStruct*>>::iterator dispatchSourceIteratorInner = sourcesToDispatch_.begin();
                            dispatchSourceIteratorInner != sourcesToDispatch_.end(); dispatchSourceIteratorInner++) {
                            if (std::get<1>(*dispatchSourceIteratorInner)->dispatchSource_ == (dispatchSourceIterator->second)->dispatchSource_) {
                                contained = true;
                                break;
                            }
                        }
                        if (!contained) {
                            delete (dispatchSourceIterator->second)->dispatchSource_;
                            (dispatchSourceIterator->second)->dispatchSource_ = NULL;
                            delete dispatchSourceIterator->second;
                            dispatchSourceIterator = registeredDispatchSources_.erase(dispatchSourceIterator);
                        }
                        if (dispatchSourceIterator == registeredDispatchSources_.end()) {
                            break;
                        }
                    }
                }
            }
        }

        {
            std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
            for (auto timeoutIterator = registeredTimeouts_.begin();
                timeoutIterator != registeredTimeouts_.end();
                timeoutIterator++) {

                if ((timeoutIterator->second)->deleteObject_) {
                    if (!(timeoutIterator->second)->isExecuted_) {
                        bool contained = false;
                        for (std::set<std::pair<DispatchPriority, TimeoutToDispatchStruct*>>::iterator timeoutIteratorInner = timeoutsToDispatch_.begin();
                            timeoutIteratorInner != timeoutsToDispatch_.end(); timeoutIteratorInner++) {
                            if (std::get<1>(*timeoutIteratorInner)->timeout_ == (timeoutIterator->second)->timeout_) {
                                contained = true;
                                break;
                            }
                        }
                        if (!contained) {
                            delete (timeoutIterator->second)->timeout_;
                            (timeoutIterator->second)->timeout_ = NULL;
                            delete timeoutIterator->second;
                            timeoutIterator = registeredTimeouts_.erase(timeoutIterator);
                        }
                        if (timeoutIterator == registeredTimeouts_.end()) {
                            break;
                        }
                    }
                }
            }
        }

        {
            std::lock_guard<std::mutex> itsLock(watchesMutex_);
            for (auto watchesIterator = registeredWatches_.begin();
                watchesIterator != registeredWatches_.end();
                watchesIterator++) {

                if ((watchesIterator->second)->deleteObject_) {
                    if (!(watchesIterator->second)->isExecuted_) {
                        bool contained = false;
                        for (auto watchesIteratorInner = watchesToDispatch_.begin();
                            watchesIteratorInner != watchesToDispatch_.end(); watchesIteratorInner++) {
                            if (std::get<1>(*watchesIteratorInner)->watch_ == (watchesIterator->second)->watch_) {
                                contained = true;
                                break;
                            }
                        }
                        if (!contained) {
                            delete (watchesIterator->second)->watch_;
                            (watchesIterator->second)->watch_ = NULL;
                            delete watchesIterator->second;
                            watchesIterator = registeredWatches_.erase(watchesIterator);
                        }
                        if (watchesIterator == registeredWatches_.end()) {
                            break;
                        }
                    }
                }
            }
        }

        if (prepare(timeout)) {
            dispatch();
        } else {
            poll();
            if (check()) {
                dispatch();
            }
        }
    }

    /*
     * The given timeout is a maximum timeout in ms, measured from the current time in the future
     * (a value of 0 means "no timeout"). It will be overridden if a timeout-event is present
     * that defines an earlier ready time.
     */
    bool prepare(const int64_t& timeout = TIMEOUT_INFINITE) {
        currentMinimalTimeoutInterval_ = timeout;

        {
            std::lock_guard<std::recursive_mutex> itsLock(dispatchSourcesMutex_);
            for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
                    dispatchSourceIterator != registeredDispatchSources_.end();
                    dispatchSourceIterator++) {

                int64_t dispatchTimeout = TIMEOUT_INFINITE;

                if (!(dispatchSourceIterator->second)->deleteObject_ &&
                        (dispatchSourceIterator->second)->dispatchSource_->prepare(dispatchTimeout)) {
                    sourcesToDispatch_.insert(*dispatchSourceIterator);
                } else if (dispatchTimeout > 0 && dispatchTimeout < currentMinimalTimeoutInterval_) {
                    currentMinimalTimeoutInterval_ = dispatchTimeout;
                }
            }
        }

        int64_t currentContextTime = getCurrentTimeInMs();

        {
            std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
            for (auto timeoutPriorityRange = registeredTimeouts_.begin();
                    timeoutPriorityRange != registeredTimeouts_.end();
                    timeoutPriorityRange++) {

                if (!(timeoutPriorityRange->second)->deleteObject_) {
                    if (!(timeoutPriorityRange->second)->timeoutElapsed_) { // check that timeout is not elapsed
                        int64_t intervalToReady = (timeoutPriorityRange->second)->timeout_->getReadyTime()
                            - currentContextTime;

                        if (intervalToReady <= 0) {
                            // set information that timeout is elapsed
                            (timeoutPriorityRange->second)->timeoutElapsed_ = true;

                            timeoutsToDispatch_.insert(*timeoutPriorityRange);
                            currentMinimalTimeoutInterval_ = TIMEOUT_NONE;
                        } else if (intervalToReady < currentMinimalTimeoutInterval_) {
                            currentMinimalTimeoutInterval_ = intervalToReady;
                        }
                    }
                }
            }
        }

        return (!sourcesToDispatch_.empty() || !timeoutsToDispatch_.empty());
    }

    void poll() {
        int managedFileDescriptorOffset = 0;
        {
            std::lock_guard<std::mutex> itsLock(fileDescriptorsMutex_);
            for (auto fileDescriptor = managedFileDescriptors_.begin() + managedFileDescriptorOffset; fileDescriptor != managedFileDescriptors_.end(); ++fileDescriptor) {
                (*fileDescriptor).revents = 0;
            }
        }

    #ifdef _WIN32
        int numReadyFileDescriptors = WSAPoll(&managedFileDescriptors_[0], managedFileDescriptors_.size(), int(currentMinimalTimeoutInterval_));
    #else
        int numReadyFileDescriptors = ::poll(&(managedFileDescriptors_[0]),
                managedFileDescriptors_.size(), int(currentMinimalTimeoutInterval_));
    #endif
        if (!numReadyFileDescriptors) {
            int64_t currentContextTime = getCurrentTimeInMs();

            {
                std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
                for (auto timeoutPriorityRange = registeredTimeouts_.begin();
                        timeoutPriorityRange != registeredTimeouts_.end();
                        timeoutPriorityRange++) {

                    if (!(timeoutPriorityRange->second)->deleteObject_) {
                        if (!(timeoutPriorityRange->second)->timeoutElapsed_) { // check that timeout is not elapsed
                            int64_t intervalToReady =
                                (timeoutPriorityRange->second)->timeout_->getReadyTime()
                                    - currentContextTime;

                            if (intervalToReady <= 0) {
                                // set information that timeout is elapsed
                                (timeoutPriorityRange->second)->timeoutElapsed_ = true;

                                timeoutsToDispatch_.insert(*timeoutPriorityRange);
                            }
                        }
                    }
                }
            }
        }

        // If the wakeup descriptor woke us up, we must acknowledge
        if (managedFileDescriptors_[0].revents) {
            wakeupAck();
        }
    }

    bool check() {
        int managedFileDescriptorOffset = 1;
        {
            std::lock_guard<std::mutex> itsLock(fileDescriptorsMutex_);
            for (auto fileDescriptor = managedFileDescriptors_.begin() + managedFileDescriptorOffset;
                    fileDescriptor != managedFileDescriptors_.end(); ++fileDescriptor) {
                {
                    std::lock_guard<std::mutex> itsWatchesLock(watchesMutex_);
                    for (auto registeredWatchIterator = registeredWatches_.begin();
                            registeredWatchIterator != registeredWatches_.end();
                            registeredWatchIterator++) {

                        if (!(registeredWatchIterator->second)->deleteObject_) {
                            if ((registeredWatchIterator->second)->fd_ == fileDescriptor->fd
                                    && fileDescriptor->revents) {
                                watchesToDispatch_.insert(*registeredWatchIterator);
                            }
                        }
                    }
                }
            }
        }

        {
            std::lock_guard<std::recursive_mutex> itsLock(dispatchSourcesMutex_);
            for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
                    dispatchSourceIterator != registeredDispatchSources_.end();
                    ++dispatchSourceIterator) {

                if (!(dispatchSourceIterator->second)->deleteObject_ &&
                        dispatchSourceIterator->second->dispatchSource_->check()) {
                    sourcesToDispatch_.insert(*dispatchSourceIterator);
                }
            }
        }

        return (!timeoutsToDispatch_.empty() ||
                !watchesToDispatch_.empty() ||
                !sourcesToDispatch_.empty());
    }

    void dispatch() {
        dispatchTimeouts();
        dispatchWatches();
        dispatchSources();
    }

    void wakeup() {
    #ifdef _WIN32
        // Send an initial buffer
        char *sendbuf = "1";

        int iResult = send(sendFd_.fd, sendbuf, (int)strlen(sendbuf), 0);
        if (iResult == SOCKET_ERROR) {
            int error = WSAGetLastError();

            if (error != WSANOTINITIALISED) {
                printf("send failed with error: %d\n", error);
            }
        }
    #else
        int64_t wake = 1;
        if(::write(wakeFd_.fd, &wake, sizeof(int64_t)) == -1) {
            std::perror("MainLoop::wakeup");
        }
    #endif
    }

    void wakeupAck() {
    #ifdef _WIN32
        // Receive until the peer closes the connection
        int iResult;
        char recvbuf[DEFAULT_BUFLEN];
        int recvbuflen = DEFAULT_BUFLEN;

        iResult = recv(wakeFd_.fd, recvbuf, recvbuflen, 0);
        if (iResult > 0) {
            //printf("Bytes received from %d: %d\n", wakeFd_.fd, iResult);
        }
        else if (iResult == 0) {
            printf("Connection closed\n");
        }
        else {
            printf("recv failed with error: %d\n", WSAGetLastError());
        }
    #else
        int64_t buffer;
        while(::read(wakeFd_.fd, &buffer, sizeof(int64_t)) == sizeof(buffer));
    #endif
    }

    void dispatchTimeouts() {
        if (timeoutsToDispatch_.size() > 0)
        {
            for (auto timeoutIterator = timeoutsToDispatch_.begin();
                    timeoutIterator != timeoutsToDispatch_.end(); timeoutIterator++) {
                auto timeoutToDispatchStruct = std::get<1>(*timeoutIterator);
                if (!timeoutToDispatchStruct->deleteObject_) {
                    timeoutToDispatchStruct->isExecuted_ = true;
                    timeoutToDispatchStruct->timeout_->dispatch();
                    timeoutToDispatchStruct->isExecuted_ = false;
                }
            }
            timeoutsToDispatch_.clear();
        }
    }

    void dispatchWatches() {
        if (watchesToDispatch_.size() > 0)
        {
            for (auto watchIterator = watchesToDispatch_.begin();
                    watchIterator != watchesToDispatch_.end(); watchIterator++) {
                auto watchToDispatchStruct = std::get<1>(*watchIterator);
                if (!watchToDispatchStruct->deleteObject_) {
                    watchToDispatchStruct->isExecuted_ = true;
                    Watch* watch = watchToDispatchStruct->watch_;
                    const unsigned int flags = (unsigned int)(watch->getAssociatedFileDescriptor().events);
                    watch->dispatch(flags);
                    watchToDispatchStruct->isExecuted_ = false;
                }
            }
            watchesToDispatch_.clear();
        }
    }

    void dispatchSources() {
        if (sourcesToDispatch_.size() > 0)
        {
            isBroken_ = false;
            for (auto dispatchSourceIterator = sourcesToDispatch_.begin();
                    dispatchSourceIterator != sourcesToDispatch_.end() && !isBroken_;
                    dispatchSourceIterator++) {
                auto dispatchSourceToDispatchStruct = std::get<1>(*dispatchSourceIterator);
                if (!dispatchSourceToDispatchStruct->deleteObject_) {
                    dispatchSourceToDispatchStruct->isExecuted_ = true;
                    bool deleteObject = dispatchSourceToDispatchStruct->deleteObject_;
                    while(!deleteObject &&
                            dispatchSourceToDispatchStruct->dispatchSource_->dispatch());
                    dispatchSourceToDispatchStruct->isExecuted_ = false;
                }
            }
            sourcesToDispatch_.clear();
        }
    }

 private:

    void cleanup() {
        {
            std::lock_guard<std::recursive_mutex> itsLock(dispatchSourcesMutex_);
            for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
                dispatchSourceIterator != registeredDispatchSources_.end();) {

                delete (dispatchSourceIterator->second)->dispatchSource_;
                (dispatchSourceIterator->second)->dispatchSource_ = NULL;
                delete dispatchSourceIterator->second;
                dispatchSourceIterator = registeredDispatchSources_.erase(dispatchSourceIterator);
            }
        }

        {
            std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
            for (auto timeoutIterator = registeredTimeouts_.begin();
                timeoutIterator != registeredTimeouts_.end();) {

                delete (timeoutIterator->second)->timeout_;
                (timeoutIterator->second)->timeout_ = NULL;
                delete timeoutIterator->second;
                timeoutIterator = registeredTimeouts_.erase(timeoutIterator);
            }
        }

        {
            std::lock_guard<std::mutex> itsLock(watchesMutex_);
            for (auto watchesIterator = registeredWatches_.begin();
                watchesIterator != registeredWatches_.end();) {

                delete (watchesIterator->second)->watch_;
                (watchesIterator->second)->watch_ = NULL;
                delete watchesIterator->second;
                watchesIterator = registeredWatches_.erase(watchesIterator);
            }
        }
    }

    void registerFileDescriptor(const pollfd& fileDescriptor) {
        std::lock_guard<std::mutex> itsLock(fileDescriptorsMutex_);
        managedFileDescriptors_.push_back(fileDescriptor);
    }

    void unregisterFileDescriptor(const pollfd& fileDescriptor) {
        wakeup();
        std::lock_guard<std::mutex> itsLock(fileDescriptorsMutex_);
        for (auto it = managedFileDescriptors_.begin();
            it != managedFileDescriptors_.end(); it++) {
            if ((*it).fd == fileDescriptor.fd && (*it).events == fileDescriptor.events) {
                managedFileDescriptors_.erase(it);
                break;
            }
        }
    }

    void registerDispatchSource(DispatchSource* dispatchSource, const DispatchPriority dispatchPriority) {
        DispatchSourceToDispatchStruct* dispatchSourceStruct =
                new DispatchSourceToDispatchStruct(dispatchSource,
                        false, false);
        std::lock_guard<std::recursive_mutex> itsLock(dispatchSourcesMutex_);
        registeredDispatchSources_.insert(
            { dispatchPriority, dispatchSourceStruct });
    }

    void unregisterDispatchSource(DispatchSource* dispatchSource) {
        {
            std::lock_guard<std::recursive_mutex> itsLock(dispatchSourcesMutex_);
            for (auto dispatchSourceIterator = registeredDispatchSources_.begin();
                dispatchSourceIterator != registeredDispatchSources_.end();
                dispatchSourceIterator++) {

                if ((dispatchSourceIterator->second)->dispatchSource_ == dispatchSource){
                    (dispatchSourceIterator->second)->deleteObject_ = true;
                    break;
                }
            }
            isBroken_ = true;
        }
    }

    void registerWatch(Watch* watch, const DispatchPriority dispatchPriority) {

        pollfd fdToRegister = watch->getAssociatedFileDescriptor();

        registerFileDescriptor(fdToRegister);

        std::lock_guard<std::mutex> itsLock(watchesMutex_);

        WatchToDispatchStruct* watchStruct = new WatchToDispatchStruct(fdToRegister.fd, watch, false, false);
        registeredWatches_.insert({ dispatchPriority, watchStruct});
    }

    void unregisterWatch(Watch* watch) {
        std::lock_guard<std::mutex> itsLock(watchesMutex_);

        unregisterFileDescriptor(watch->getAssociatedFileDescriptor());

        for (auto watchIterator = registeredWatches_.begin();
            watchIterator != registeredWatches_.end(); watchIterator++) {

            if ((watchIterator->second)->watch_ == watch) {
                (watchIterator->second)->deleteObject_ = true;
                break;
            }
        }
    }

    void registerTimeout(Timeout* timeout, const DispatchPriority dispatchPriority) {
        TimeoutToDispatchStruct* timeoutStruct = new TimeoutToDispatchStruct(
                timeout, false, false, false);
        std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
        registeredTimeouts_.insert(
            { dispatchPriority, timeoutStruct });
    }

    void unregisterTimeout(Timeout* timeout) {
        std::lock_guard<std::mutex> itsLock(timeoutsMutex_);
        for (auto timeoutIterator = registeredTimeouts_.begin();
                timeoutIterator != registeredTimeouts_.end();
                timeoutIterator++) {

            if ((timeoutIterator->second)->timeout_ == timeout) {
                (timeoutIterator->second)->deleteObject_ = true;
                break;
            }
        }
    }

    std::shared_ptr<MainLoopContext> context_;

    std::vector<pollfd> managedFileDescriptors_;
    std::mutex fileDescriptorsMutex_;

    struct DispatchSourceToDispatchStruct {
        DispatchSource* dispatchSource_;
        std::atomic<bool> isExecuted_; /* execution flag: indicates, whether the dispatchSource is dispatched currently */
        std::atomic<bool> deleteObject_; /* delete flag: indicates, whether the dispatchSource can be deleted*/

        DispatchSourceToDispatchStruct(DispatchSource* _dispatchSource,
            bool _isExecuted,
            bool _deleteObject) {
                dispatchSource_ = _dispatchSource;
                isExecuted_ = _isExecuted;
                deleteObject_ = _deleteObject;
        }
    };

    struct TimeoutToDispatchStruct {
        Timeout* timeout_;
        std::atomic<bool> isExecuted_; /* execution flag: indicates, whether the timeout is dispatched currently */
        std::atomic<bool> deleteObject_; /* delete flag: indicates, whether the timeout can be deleted*/
        std::atomic<bool> timeoutElapsed_; /* timeout elapsed flag: indicates, whether the timeout is elapsed*/

        TimeoutToDispatchStruct(Timeout* _timeout,
            bool _isExecuted,
            bool _deleteObject,
            bool _timeoutElapsed) {
                timeout_ = _timeout;
                isExecuted_ = _isExecuted;
                deleteObject_ = _deleteObject;
                timeoutElapsed_ = _timeoutElapsed;
        }
    };

    struct WatchToDispatchStruct {
        int fd_;
        Watch* watch_;
        std::atomic<bool> isExecuted_; /* execution flag: indicates, whether the watch is dispatched currently */
        std::atomic<bool> deleteObject_; /* delete flag: indicates, whether the watch can be deleted*/

        WatchToDispatchStruct(int _fd,
            Watch* _watch,
            bool _isExecuted,
            bool _deleteObject) {
                fd_ = _fd;
                watch_ = _watch;
                isExecuted_ = _isExecuted;
                deleteObject_ = _deleteObject;
        }
    };

    std::multimap<DispatchPriority, DispatchSourceToDispatchStruct*> registeredDispatchSources_;
    std::multimap<DispatchPriority, WatchToDispatchStruct*> registeredWatches_;
    std::multimap<DispatchPriority, TimeoutToDispatchStruct*> registeredTimeouts_;

    std::recursive_mutex dispatchSourcesMutex_;
    std::mutex watchesMutex_;
    std::mutex timeoutsMutex_;

    std::set<std::pair<DispatchPriority, DispatchSourceToDispatchStruct*>> sourcesToDispatch_;
    std::set<std::pair<DispatchPriority, WatchToDispatchStruct*>> watchesToDispatch_;
    std::set<std::pair<DispatchPriority, TimeoutToDispatchStruct*>> timeoutsToDispatch_;

    DispatchSourceListenerSubscription dispatchSourceListenerSubscription_;
    WatchListenerSubscription watchListenerSubscription_;
    TimeoutSourceListenerSubscription timeoutSourceListenerSubscription_;
    WakeupListenerSubscription wakeupListenerSubscription_;

    int64_t currentMinimalTimeoutInterval_;
    bool breakLoop_;
    bool hasToStop_;
    bool running_;

#ifdef _WIN32
    pollfd sendFd_;
#endif

    pollfd wakeFd_;

    std::atomic<bool> isBroken_;

    std::promise<bool>* stopPromise;
};


} // namespace CommonAPI

#endif /* DEMO_MAIN_LOOP_H_ */
